-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] randomize_block_order() not compatible with stage fusion #26090
Conversation
output.append(s) | ||
|
||
output.extend(reorder_buf) | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is a good one-off.
) | ||
return Dataset(plan, self._epoch, self._lazy) | ||
plan = self._plan.with_stage(RandomizeBlocksStage(seed)) | ||
return Dataset(plan, self._epoch, self._lazy, defer_execution=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm having both lazy
and defer_execution
is a bit odd, why can't this follow the self._lazy
semantics? Is there an issue with executing this stage eagerly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I can't remember why I did this... removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it breaks lazy read->map_batches() fusion, that's why. I reverted this since it broke a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm do you know why it breaks that?
@@ -329,6 +329,8 @@ def _optimize(self) -> Tuple[BlockList, DatasetStats, List[Stage]]: | |||
""" | |||
context = DatasetContext.get_current() | |||
blocks, stats, stages = self._get_source_blocks_and_stages() | |||
if context.optimize_reorder_stages: | |||
stages = _reorder_stages(stages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl If you rewrite the read stage before you reorder stages, won't the read->map_batches fusion work without the lazy
vs defer_execution
hack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because if you call .randomize_blocks() eagerly it will materialize the read blocks, so it's too late. We have to force it to be lazy as a special case.
I also need this for auto_repartition() anyway, so I think this is a sensible thing... and if we move to lazy by default it will go away.
@@ -169,6 +174,8 @@ def __init__( | |||
plan: ExecutionPlan, | |||
epoch: int, | |||
lazy: bool, | |||
*, | |||
defer_execution: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had hard time understand why there's a defer_execution when there is already a lazy parameter? Seems to muddy the execution semantics even more.
Why are these changes needed?
Per the discussion in #26057, fix the stage fusion issue by re-ordering the randomize stage past any 1-1 stages.
Closes #26057